1 第三方依赖包的使用:
目前python作业支持两种添加依赖包的方式: 第一种:将依赖包(zip、egg)上传至大数据平台的资源库中,右键复制存储路径即可获得该资源的在HDFS上的路径, 在python作业中添加该依赖包(位于def main(sparkSession):之前),以# addPkg开头,例如:
# addPkg testPython.zip
import testPython
def main(sparkSession):
info("Hello PySpark")
info(testPython.testFunc("Python"))
sparkSession.sql("insert overwrite table test values('6', '20170801', '666')")
第二种: 在每一台cdh节点上运行pip install 命令。
2 自定义参数设置
需要自己设定spark的相关参数时,在py文件头部加上 # set key=value (#号不要去掉) 例如:
# set spark.sql.shuffle.partitions = 5
def main(sparkSession):
df = sparkSession.sql('select * from test').toDF()
df.write.parquet('/user/datacompute/platformtool/data/tao.fu')
用户执行pythno作业时,相关的读写权限受到限制,用户可以读取的路径为/user/datacompute/[projectcode]/resources/[userid]/lastest/以及 /user/datacompute/users/[userid]/, 其中projectcode为项目code,userid为用户id;用户可以写入的路径为/user/datacompute/[projectcode]/data/[userid]。具体的路径可以在资源管理中右击鼠标选择复制存储路径查看。
3 读取文件以及写入文件:
当需要读取文件时,请首先将文件上传至大数据平台中,再通过open方法读取该文件。可以将文件结果保存至大数据平台中,调用saveFile()方法,该方法的签名为saveFile(path, file, overwrite=False), 第一个参数为保存路径,第二个参数为file对象, 第3个参数为是否覆盖原来的文件。示例:以下程序读取文件,重新写入数据,并保存到大数据平台
#/bin/python3
def main(sparkSession):
file = open('test','wb+')
# file.write('testPython') #python2
file.write('testPython'.encode()) #python3
saveFile('/zhy/test.txt',file)
testdata为用户上传至大数据平台资源管理中的文件,上传过后,即可以利用open方法读取该文件。对文件写入新的内容之后,将该文件保存到大数据平台资源库的/zhy目录下,文件重新命名为testfile.txt,运行脚本之后,刷新平台的资源管理,在/zhy目录下即可看见testfile.txt 注:默认情况下根据开头注释是使用python3,若使用python2的话改成#/bin/python2,对于版本的不同,在安装依赖时需要选择对应版本的pip。
使用pandas读取xlsx文件时,采用以下步骤,首先使用open打开文件,再通过pandas读取:
import pandas as pd
def main(sparkSession):
file = open('/user/datacompute/spark_demo/resources/797/latest/test.xlsx')
pd.read_excel(file.name)
用户也可以将hdfs中的文件保存至资源管理中,用法为copyFile(src, dest),其中src为hdfs上的源文件路径,dest为待保存的资源路径,具体用法与saveFile相同 用户需要保存excel文件时,可以参考以下代码:
file = open('test.xlsx', 'wb+')
workbook = xlsxwriter.Workbook(file)
worksheet = workbook.add_worksheet('Sheet1')
worksheet.write('A1', 'Hello')
worksheet.write('A2', 'python')
workbook.close()
saveFile('/zhy/test.xlsx', file)
资源管理包括【个人文件夹】,每个用户独立,只有用户自己可见,作业如果需要写文件,往个人文件夹写入,个人文件夹hdfs上路径:/user/datacompute/[projectcode]/data/[域账号],调用savePersonalFile(file, overwrite=False),第一个参数为file对象,第二个参数为是否覆盖写入:
file = open('test.xlsx', 'wb+')
workbook = xlsxwriter.Workbook(file)
worksheet = workbook.add_worksheet('Sheet1')
worksheet.write('A1', 'Hello')
worksheet.write('A2', 'python')
workbook.close()
savePersonalFile(file)
#或者直接写文件名
savePersonalFile('test.xlsx')
saveFile和savePersonalFile的区别在存储路径上,前者存储前缀是/user/datacompute/[projectcode]/resources/[userid]/latest/,后者存储路径是/user/datacompute/users/[username]/test.xlsx
注意:通过其它方式加载文件时(np.loadtext等),先要通过open打开欲读取的文件,这样会把文件下载到本地,这样再通过其它方式去加载
import pickle
# 如果出现错误UnicodeDecodeError: ‘ascii’ codec can’t decode byte,load添加参数:encoding='iso-8859-1'
def main(sparkSession):
with open("var_fp_dict2.pkl", "rb") as pickle_file:
content = pickle.load(pickle_file)
4 pandas分析数据
首先需要了解平台上pandas版本,通过命令 pd.show_versions()获取。
4.1 数据生成
#引入pandas包
import pandas as pd
#展示pandas版本
print("pandas version:",pd.show_versions())
打开文件并通过read_csv函数转换成pandas专用的结构:dataframe进行处理
file = open('/user/datacompute/users/hongyi.zhou/Shanghai license plate price.csv')
df = pd.read_csv(file.name,index_col=0)
#输出每个列的列名以及数据类型
print("types are : ",df.dtypes)
4.2 数据展示
#展示dataframe前几行
print("head:",df.head())
#展示dataframe后几行
print("tail:",df.tail(3))
#展示dataframe的索引列,如果需要将某一列数据变成索引的话,在初始化dataframe时,指定index=列名
print("index:",df.index)
#展示dataframe的字段
print("columns:",df.columns)
#将dataframe数据以numpy的array格式输出
print("values:",df.values)
#展示数据中每个列的个数,平均值,方差,最大值等描述数据
print("describe:",df.describe())
#输出数据的转置矩阵
print("transport:",df.T)
#dataframe数据按照index排序
print("sort index:",df.sort_index(axis=1, ascending=False))
#dataframe数据按照某列名排序
print("sort val:",df.sort_values(by='Total number of applicants'))
4.3 数据获取
#获取某列名下的所有数据
print(df['Total number of applicants'])
#获取前三行数据,包含所有列
print(df[0:3])
#获取索引范围内所有数据,包含所有列
print(df['Jan-02':'May-02'])
#获取索引的数据,包含所有列
print(df.loc['Jan-02'])
#获取所有索引的数据,指定列的数据
print(df.loc[:,['Total number of license issued','lowest price']])
#获取指定索引范围,指定列的数据
print(df.loc['Jan-02':'May-02',['Total number of license issued','lowest price']])
#获取某个索列,指定列的数据
print(df.loc['Jan-02',['Total number of license issued','lowest price']])
#获取某个索引,某个列的数据
print(df.loc['Jan-02','Total number of license issued'])
4.4 多数据处理
详情参考官网文档-合并章节
4.4.1 concat
df = pd.DataFrame(np.random.randn(10, 4))
#分片
pieces = [df[:3], df[3:7], df[7:]]
#合并后和原df相同
pd.concat(pieces)
4.4.2 join
left = pd.DataFrame({'key': ['foo', 'foo'], 'lval': [1, 2]})
right = pd.DataFrame({'key': ['foo', 'foo'], 'rval': [4, 5]})
#根据key来合并两个dataframe,用笛卡尔积方式合并
pd.merge(left, right, on='key')
4.5 分组
通过"group by"可以完成以下多个任务
- 根据条件拆分数据到多个组
- 对每个组使用不同函数
- 将每个组的结果合并 详情参考官方文档-分组章节 更详细介绍请根据具体pandas版本参见pandas官网文档
#构建测试数据
df = pd.DataFrame({'A' : ['foo', 'bar', 'foo', 'bar',
'foo', 'bar', 'foo', 'foo'],
'B' : ['one', 'one', 'two', 'three',
'two', 'two', 'one', 'three'],
'C' : np.random.randn(8),
'D' : np.random.randn(8)})
#对A列数据做group操作,并求sum
df.groupby('A').sum()
#对多列(A+B)数据做group操作,并求sum
df.groupby(['A','B']).sum()
#通过自定义函数对数据做group操作
def get_letter_type(letter):
if letter.lower() in 'aeiou':
return 'vowel'
else:
return 'consonant'
grouped = df.groupby(get_letter_type, axis=1).sum()
4.6 数据透视表
pandas提供了强大的数据透视表功能来方便对dataframe数据进行重组,具体的例子可以参见官方cookbook文档
import datetime
#构建测试数据
df = pd.DataFrame({'A': ['one', 'one', 'two', 'three'] * 6,
'B': ['A', 'B', 'C'] * 8,
'C': ['foo', 'foo', 'foo', 'bar', 'bar', 'bar'] * 4,
'D': np.random.randn(24),
'E': np.random.randn(24),
'F': [datetime.datetime(2013, i, 1) for i in range(1, 13)] +
[datetime.datetime(2013, i, 15) for i in range(1, 13)]})
#构建数据透视表,采用D列作为数据,A+B作为索引列,C作为列名
pd.pivot_table(df, values='D', index=['A', 'B'], columns=['C'])
4.7 数据展示
更详细的数据展示,请参考matplotlib生成数据图表 使用pandas自带的绘图函数
ts = pd.Series(np.random.randn(1000), index=pd.date_range('1/1/2000', periods=1000))
ts = ts.cumsum()
ts.plot()
结合matplotlib.pyplot绘图
import matplotlib.pyplot as plt
df = pd.DataFrame(np.random.randn(1000, 4), index=ts.index,
columns=['A', 'B', 'C', 'D'])
#累加
df = df.cumsum()
#数据展示
plt.figure(); df.plot(); plt.legend(loc='best')